Skip to content

refactor(job-queue): drop legacy limiter methods, fix QueuedExecutionStrategy release, rename releaseClaim#511

Merged
sroussey merged 11 commits into
mainfrom
claude/standardize-job-queue-1kQBU
May 18, 2026
Merged

refactor(job-queue): drop legacy limiter methods, fix QueuedExecutionStrategy release, rename releaseClaim#511
sroussey merged 11 commits into
mainfrom
claude/standardize-job-queue-1kQBU

Conversation

@sroussey
Copy link
Copy Markdown
Collaborator

@sroussey sroussey force-pushed the claude/standardize-job-queue-1kQBU branch from 4737b39 to fec8ccd Compare May 16, 2026 03:38
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 16, 2026

Open in StackBlitz

@workglow/cli

npm i https://pkg.pr.new/@workglow/cli@511

@workglow/ai

npm i https://pkg.pr.new/@workglow/ai@511

@workglow/browser-control

npm i https://pkg.pr.new/@workglow/browser-control@511

@workglow/indexeddb

npm i https://pkg.pr.new/@workglow/indexeddb@511

@workglow/javascript

npm i https://pkg.pr.new/@workglow/javascript@511

@workglow/job-queue

npm i https://pkg.pr.new/@workglow/job-queue@511

@workglow/knowledge-base

npm i https://pkg.pr.new/@workglow/knowledge-base@511

@workglow/mcp

npm i https://pkg.pr.new/@workglow/mcp@511

@workglow/storage

npm i https://pkg.pr.new/@workglow/storage@511

@workglow/task-graph

npm i https://pkg.pr.new/@workglow/task-graph@511

@workglow/tasks

npm i https://pkg.pr.new/@workglow/tasks@511

@workglow/util

npm i https://pkg.pr.new/@workglow/util@511

workglow

npm i https://pkg.pr.new/workglow@511

@workglow/anthropic

npm i https://pkg.pr.new/@workglow/anthropic@511

@workglow/bun-webview

npm i https://pkg.pr.new/@workglow/bun-webview@511

@workglow/chrome-ai

npm i https://pkg.pr.new/@workglow/chrome-ai@511

@workglow/electron

npm i https://pkg.pr.new/@workglow/electron@511

@workglow/google-gemini

npm i https://pkg.pr.new/@workglow/google-gemini@511

@workglow/huggingface-inference

npm i https://pkg.pr.new/@workglow/huggingface-inference@511

@workglow/huggingface-transformers

npm i https://pkg.pr.new/@workglow/huggingface-transformers@511

@workglow/node-llama-cpp

npm i https://pkg.pr.new/@workglow/node-llama-cpp@511

@workglow/ollama

npm i https://pkg.pr.new/@workglow/ollama@511

@workglow/openai

npm i https://pkg.pr.new/@workglow/openai@511

@workglow/playwright

npm i https://pkg.pr.new/@workglow/playwright@511

@workglow/postgres

npm i https://pkg.pr.new/@workglow/postgres@511

@workglow/sqlite

npm i https://pkg.pr.new/@workglow/sqlite@511

@workglow/supabase

npm i https://pkg.pr.new/@workglow/supabase@511

@workglow/tf-mediapipe

npm i https://pkg.pr.new/@workglow/tf-mediapipe@511

commit: 1e2f15e

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 16, 2026

Open in StackBlitz

@workglow/cli

npm i https://pkg.pr.new/@workglow/cli@511

@workglow/ai

npm i https://pkg.pr.new/@workglow/ai@511

@workglow/browser-control

npm i https://pkg.pr.new/@workglow/browser-control@511

@workglow/indexeddb

npm i https://pkg.pr.new/@workglow/indexeddb@511

@workglow/javascript

npm i https://pkg.pr.new/@workglow/javascript@511

@workglow/job-queue

npm i https://pkg.pr.new/@workglow/job-queue@511

@workglow/knowledge-base

npm i https://pkg.pr.new/@workglow/knowledge-base@511

@workglow/mcp

npm i https://pkg.pr.new/@workglow/mcp@511

@workglow/storage

npm i https://pkg.pr.new/@workglow/storage@511

@workglow/task-graph

npm i https://pkg.pr.new/@workglow/task-graph@511

@workglow/tasks

npm i https://pkg.pr.new/@workglow/tasks@511

@workglow/util

npm i https://pkg.pr.new/@workglow/util@511

workglow

npm i https://pkg.pr.new/workglow@511

@workglow/anthropic

npm i https://pkg.pr.new/@workglow/anthropic@511

@workglow/bun-webview

npm i https://pkg.pr.new/@workglow/bun-webview@511

@workglow/chrome-ai

npm i https://pkg.pr.new/@workglow/chrome-ai@511

@workglow/electron

npm i https://pkg.pr.new/@workglow/electron@511

@workglow/google-gemini

npm i https://pkg.pr.new/@workglow/google-gemini@511

@workglow/huggingface-inference

npm i https://pkg.pr.new/@workglow/huggingface-inference@511

@workglow/huggingface-transformers

npm i https://pkg.pr.new/@workglow/huggingface-transformers@511

@workglow/node-llama-cpp

npm i https://pkg.pr.new/@workglow/node-llama-cpp@511

@workglow/ollama

npm i https://pkg.pr.new/@workglow/ollama@511

@workglow/openai

npm i https://pkg.pr.new/@workglow/openai@511

@workglow/playwright

npm i https://pkg.pr.new/@workglow/playwright@511

@workglow/postgres

npm i https://pkg.pr.new/@workglow/postgres@511

@workglow/sqlite

npm i https://pkg.pr.new/@workglow/sqlite@511

@workglow/supabase

npm i https://pkg.pr.new/@workglow/supabase@511

@workglow/tf-mediapipe

npm i https://pkg.pr.new/@workglow/tf-mediapipe@511

commit: fec8ccd

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 16, 2026

Coverage Report

Status Category Percentage Covered / Total
🔵 Lines 61.77% 22491 / 36409
🔵 Statements 61.64% 23272 / 37749
🔵 Functions 62.75% 4247 / 6768
🔵 Branches 50.35% 10876 / 21599
File CoverageNo changed files found.
Generated in workflow #2292 for commit 1e2f15e by the Vitest Coverage Report Action

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the job-queue subsystem toward a leased-claim model and a split “message queue + job store” facade, while removing legacy limiter APIs and the ABORTING job state. It updates all storage providers/migrations and adjusts tests/callers to the new naming (visible_at, attempts, max_attempts, lease_owner, etc.) and limiter semantics.

Changes:

  • Introduces IMessageQueue, IJobStore, and IClaim (plus factories/wrappers) and updates JobQueueClient/Server/Worker to use the split abstractions (with a legacy storage wrapper).
  • Replaces ABORTING with abort_requested_at, adds lease expiry + extendLease, and renames core queue columns (run_aftervisible_at, max_retriesmax_attempts, etc.) across providers + migrations.
  • Removes legacy limiter methods (canProceed/recordJobStart/recordJobCompletion) in favor of tryAcquire/release/complete, and updates limiters/tests accordingly.

Reviewed changes

Copilot reviewed 62 out of 63 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
providers/supabase/src/job-queue/SupabaseQueueStorage.ts Supabase queue schema rename + lease/abort support + claim release rename.
providers/supabase/src/job-queue/SupabaseMessageQueue.ts New Supabase IMessageQueue wrapper with claims + buffered writes.
providers/supabase/src/job-queue/SupabaseJobStore.ts New Supabase IJobStore wrapper with buffered result/error staging.
providers/supabase/src/job-queue/createSupabaseQueue.ts Factory returning paired messageQueue/jobStore/core for Supabase.
providers/supabase/src/job-queue/common.ts Re-exports new Supabase queue façade and factory.
providers/sqlite/src/migrations/sqliteQueueMigrations.ts Updates SQLite queue migrations for renamed columns + lease/abort columns.
providers/sqlite/src/job-queue/SqliteQueueStorage.ts SQLite queue storage updated for renamed columns + lease reclaim + extendLease + abort behavior.
providers/sqlite/src/job-queue/SqliteMessageQueue.ts New SQLite IMessageQueue wrapper with claims + buffered writes.
providers/sqlite/src/job-queue/SqliteJobStore.ts New SQLite IJobStore wrapper with buffered result/error staging.
providers/sqlite/src/job-queue/createSqliteQueue.ts Factory returning paired messageQueue/jobStore/core for SQLite.
providers/sqlite/src/job-queue/common.ts Re-exports new SQLite queue façade and factory.
providers/postgres/src/migrations/postgresQueueMigrations.ts Updates Postgres queue migrations for renamed columns + lease/abort columns.
providers/postgres/src/job-queue/PostgresQueueStorage.ts Postgres queue storage updated for renamed columns + lease reclaim + extendLease + abort behavior.
providers/postgres/src/job-queue/PostgresMessageQueue.ts New Postgres IMessageQueue wrapper with claims + buffered writes.
providers/postgres/src/job-queue/PostgresJobStore.ts New Postgres IJobStore wrapper with buffered result/error staging.
providers/postgres/src/job-queue/createPostgresQueue.ts Factory returning paired messageQueue/jobStore/core for Postgres.
providers/postgres/src/job-queue/common.ts Re-exports new Postgres queue façade and factory.
packages/test/src/test/task/FetchTask.test.ts Updates test client API from submit to send.
packages/test/src/test/task-graph-job-queue/genericTaskGraphJobQueueTests.ts Updates queue client options (maxRetriesmaxAttempts, submitsend).
packages/test/src/test/job-queue/TelemetryQueueStorage.test.ts Updates job shape field (run_aftervisible_at).
packages/test/src/test/job-queue/RateLimiter.test.ts Removes tests for deleted legacy limiter methods.
packages/test/src/test/job-queue/Limiters.test.ts Updates limiter tests to tryAcquire/release/clear semantics.
packages/test/src/test/job-queue/InMemoryJobQueue.test.ts Adds coverage for abort_requested_at + lease expiry + DLQ + (claimed) prefetch scenarios.
packages/test/src/test/job-queue/genericQueueStorageSubscriptionTests.ts Updates job shape field (run_aftervisible_at) throughout.
packages/test/src/test/job-queue/genericPrefixedQueueStorageTests.ts Updates job shape field (run_aftervisible_at) and related comments.
packages/test/src/test/job-queue/genericJobQueueTests.ts Updates client API (send), retries naming, and ABORTING expectations.
packages/tasks/src/task/FetchUrlTask.ts Updates queue client API to send and maxAttempts.
packages/job-queue/src/queue-storage/wrapQueueStorage.ts New adapter that wraps legacy IQueueStorage into messageQueue/jobStore + claims.
packages/job-queue/src/queue-storage/TelemetryQueueStorage.ts Traces new extendLease and renames releasereleaseClaim; updates next signature.
packages/job-queue/src/queue-storage/IQueueStorage.ts Removes ABORTING, renames core fields, adds lease/abort fields + extendLease + releaseClaim.
packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts Implements lease expiry reclaim, abort_requested_at behavior, extendLease, renamed fields.
packages/job-queue/src/queue-storage/InMemoryMessageQueue.ts New in-memory IMessageQueue wrapper with claims + buffered writes.
packages/job-queue/src/queue-storage/InMemoryJobStore.ts New in-memory IJobStore wrapper with buffered result/error staging.
packages/job-queue/src/queue-storage/IMessageQueue.ts New IMessageQueue interface + send options.
packages/job-queue/src/queue-storage/IJobStore.ts New IJobStore interface for read/mutation operations.
packages/job-queue/src/queue-storage/IClaim.ts New claim abstraction with ack/retry/fail/extendLease.
packages/job-queue/src/queue-storage/createInMemoryQueue.ts Factory returning paired in-memory messageQueue/jobStore/core.
packages/job-queue/src/limiter/RateLimiter.ts Removes legacy APIs; adds complete() no-op; relies on storage atomic reserve.
packages/job-queue/src/limiter/NullLimiter.ts Removes legacy APIs; adds complete() no-op.
packages/job-queue/src/limiter/ILimiter.ts Removes legacy APIs; adds complete(token) semantics distinct from release(token).
packages/job-queue/src/limiter/EvenlySpacedRateLimiter.ts Removes legacy start/completion hooks; adds complete() no-op; keeps atomic tryAcquire.
packages/job-queue/src/limiter/DelayLimiter.ts Removes legacy APIs; adds complete() no-op; keeps rollback-capable release(token).
packages/job-queue/src/limiter/ConcurrencyLimiter.ts Removes legacy APIs; implements complete() to decrement running count.
packages/job-queue/src/limiter/CompositeLimiter.ts Removes legacy APIs; adds complete() propagation to children.
packages/job-queue/src/job/MessageQueueClient.ts New producer-only client abstraction over IMessageQueue.
packages/job-queue/src/job/JobStorageConverters.ts Renames fields in storage↔class conversions; maps worker_id→lease_owner; adds lease/abort fields.
packages/job-queue/src/job/JobQueueWorker.ts Switches to messageQueue/jobStore, adds lease extension, DLQ support, prefetch loop, abort polling by abort_requested_at.
packages/job-queue/src/job/JobQueueServer.ts Switches to messageQueue/jobStore with legacy wrapper; removes restart fixup in favor of lease reclaim; adds DLQ plumbing.
packages/job-queue/src/job/JobQueueEventListeners.ts Renames retry event payload to visibleAt.
packages/job-queue/src/job/JobQueueClient.ts Switches to messageQueue/jobStore with legacy wrapper; renames submit→send and runAfter→delaySeconds, maxRetries→maxAttempts.
packages/job-queue/src/job/Job.ts Renames fields (visibleAt/attempts/maxAttempts/leaseOwner) and adds abort/lease timestamps.
packages/job-queue/src/job/DeadLetter.ts Adds dead-letter payload type.
packages/job-queue/src/common.ts Re-exports new queue abstractions, factories, and MessageQueueClient/DeadLetter.
packages/indexeddb/src/migrations/indexedDbQueueMigrations.ts Renames IndexedDB index to visible_at.
packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts Updates IndexedDB queue for renamed fields + lease reclaim + extendLease + abort semantics.
packages/indexeddb/src/job-queue/IndexedDbMessageQueue.ts New IndexedDB IMessageQueue wrapper with claims + buffered writes.
packages/indexeddb/src/job-queue/IndexedDbJobStore.ts New IndexedDB IJobStore wrapper with buffered result/error staging.
packages/indexeddb/src/job-queue/createIndexedDbQueue.ts Factory returning paired IndexedDB messageQueue/jobStore/core.
packages/indexeddb/src/job-queue/common.ts Re-exports new IndexedDB queue façade and factory.
packages/ai/src/job/AiJob.ts Removes ABORTING checks; relies on AbortSignal only.
packages/ai/src/execution/QueuedExecutionStrategy.ts Updates limiter acquisition to return token and release on exit.
examples/web/src/status/QueueStatus.tsx Removes ABORTING status display.
.gitignore Ignores .claude/worktrees/.
Comments suppressed due to low confidence (1)

providers/supabase/src/job-queue/SupabaseQueueStorage.ts:430

  • extendLease() interpolates ms into SQL via Number(ms) but does not validate ms is finite and non-negative. This can lead to invalid SQL or shortening leases unexpectedly. Consider validating ms similarly to id (Number.isFinite, >=0) before issuing the UPDATE.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +832 to +836
const existing = await this.jobStore.get(job.id);
const claim = this.requireClaim(job.id);
if (claim) {
await claim.fail();
}
await this.storage.complete(this.classToStorage(job));
this.events.emit("job_retry", job.id, job.runAfter);
const claim = this.requireClaim(job.id);
const delaySeconds = Math.max(0, Math.floor((job.visibleAt.getTime() - Date.now()) / 1000));
Comment on lines +758 to +761
/** Internal — resolve the active claim for a job id, throw if missing. */
private requireClaim(jobId: unknown): IClaim<JobStorageFormat<Input, Output>> | undefined {
return this.activeClaims.get(jobId);
}
);
} finally {
await limiter.recordJobCompletion();
await limiter.release(token);
Comment on lines +379 to +383
UPDATE ${this.tableName}
SET status = '${JobStatus.PROCESSING}', last_ran_at = NOW() AT TIME ZONE 'UTC', worker_id = '${escapedWorkerId}'
SET status = '${JobStatus.PROCESSING}',
last_attempted_at = NOW() AT TIME ZONE 'UTC',
lease_owner = '${escapedWorkerId}',
lease_expires_at = NOW() AT TIME ZONE 'UTC' + (${Number(leaseMs)} * INTERVAL '1 millisecond')
Comment on lines 34 to 37
options?: {
/** @deprecated renamed to includeLeaseOwner; both names accepted */
readonly includeWorkerId?: boolean;
}
Comment on lines +219 to +223
EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN run_after TO visible_at';
EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN last_ran_at TO last_attempted_at';
EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN run_attempts TO attempts';
EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN max_retries TO max_attempts';
EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN worker_id TO lease_owner';
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 62 out of 63 changed files in this pull request and generated 5 comments.

Comment on lines +206 to +211
// ---------------------------------------------------------------------------
// Minimal in-memory IMessageQueue for DLQ testing
// ---------------------------------------------------------------------------

import type { IClaim, IMessageQueue, MessageId } from "@workglow/job-queue";

job.status = JobStatus.FAILED;
job.abort_requested_at = now;
job.completed_at = now;
await this.complete(job);
Comment on lines 622 to 628
try {
await this.validateJobState(job);
} catch (validationErr) {
// Validation failed before we ran any actual work — release THIS
// limiter slot (by token, not by recency) so it doesn't count toward
// the rate limit and we don't accidentally release another worker's
// slot.
try {
await this.limiter.release(limiterToken);
slotReleased = true;
} catch {
// best-effort
}
// Throw — the outer finally block's limiter.complete() will release
// the slot. Do NOT call limiter.release() here too; that would
// double-decrement the counter and admit one extra concurrent job.
throw validationErr;
Comment on lines +238 to +242
if (!current) return;
// Re-use complete() but preserve attempts by subtracting 1 to offset the
// mandatory increment in legacy storage backends.
await this.storage.complete({ ...current, status, attempts: (current.attempts ?? 1) - 1 });
}
Comment on lines +231 to +234
`ALTER TABLE ${this.tableName} RENAME COLUMN last_ran_at TO last_attempted_at`,
`ALTER TABLE ${this.tableName} RENAME COLUMN run_attempts TO attempts`,
`ALTER TABLE ${this.tableName} RENAME COLUMN max_retries TO max_attempts`,
`ALTER TABLE ${this.tableName} RENAME COLUMN worker_id TO lease_owner`,
claude added 7 commits May 18, 2026 22:33
…elease methods

Add `complete(token)` to `ILimiter` as the normal-path post-job release.
Semantics differ from `release(token)`: release undoes a reservation as if
the job never ran; complete finalises a reservation for a job that actually
executed (allowing rate-limiters to record the slot as consumed rather than
retracting it from the window).

Remove the legacy `acquire`, `recordJobStart`, and `waitUntilAvailable`
methods that conflated check+record into a non-atomic sequence. Replace with
the existing atomic `tryAcquire` + new `complete` pair. All limiter
implementations (`ConcurrencyLimiter`, `RateLimiter`, `DelayLimiter`,
`EvenlySpacedRateLimiter`, `NullLimiter`, `CompositeLimiter`) and
`QueuedExecutionStrategy` updated accordingly.

Merge the `RateLimiter.test.ts` suite into `Limiters.test.ts` and expand
coverage of the new `complete` / `release` contracts.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
… lease expiry

Remove the ABORTING job status. Abort is now a two-phase signal:
- PENDING jobs are immediately marked FAILED with abort_requested_at set.
- PROCESSING jobs get abort_requested_at written; the running worker
  observes it via checkForAbortingJobs() and triggers its AbortController.
  The job transitions to FAILED when the worker's execute() rejects.

Add two new columns/fields across all backends (InMemory, SQLite, Postgres,
Supabase, IndexedDB) and the JobStorageFormat type:
- abort_requested_at — timestamp set when abort() is called
- lease_expires_at — tracks when the current PROCESSING lease expires,
  enabling crash-recovery re-claim without a separate lease table

Add extendLease(id, workerId, ms) to IQueueStorage; implement across all
backends with guard that lease_owner must match. Add opt-in
`extendLeaseWhileRunning` option to JobQueueWorker that sets a periodic
setInterval to renew the lease at 50% of leaseMs while the job runs.

Guard the delaySeconds precision: use sub-second resolution (remove
Math.floor) so short-interval tests do not re-claim before the lease floor.

New tests in InMemoryJobQueue.test.ts cover: PENDING→FAILED abort path,
PROCESSING abort_requested_at, lease expiry re-claim, extendLease survival,
and extendLease ownership guard.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
Rename storage columns across all backends (InMemory, SQLite, Postgres,
Supabase, IndexedDB), JobStorageFormat, Job, and all test helpers:

  run_after         → visible_at
  last_ran_at       → last_attempted_at
  run_attempts      → attempts
  max_retries       → max_attempts   (default bumped from 20 to 10)
  worker_id         → lease_owner

Rename the producer API on JobQueueClient:
  enqueue()         → send()
  enqueueBatch()    → sendBatch()

Add idempotent v3 migration for Postgres that checks each column
individually with IF EXISTS so a partially-applied migration doesn't skip
all remaining renames. Also sets DEFAULT 10 when renaming max_retries.

Add idempotent migrations for SQLite with per-column existence guards.
Fix Postgres migration schema scope check to use current_schema().
Fix Supabase rename error handling to re-throw non-42703 errors per
rename step rather than short-circuiting the loop.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…t InMemory; add wrapQueueStorage

Split the monolithic IQueueStorage into three focused interfaces:

  IMessageQueue<Body>  — send, sendBatch, receive, releaseClaim, and
                         optional subscribeToChanges. Pure transport.
  IClaim<Body>         — per-message handle: ack, fail, retry, extendLease.
  IJobStore<I,O>       — job-record CRUD: get, peek, size, getByRunId,
                         saveProgress, saveResult, saveError, saveStatus,
                         abort, delete, deleteAll, deleteByStatusAndAge.

Implement InMemoryMessageQueue and InMemoryJobStore backed by the existing
InMemoryQueueStorage core. Add createInMemoryQueue() factory that returns
{ messageQueue, jobStore }.

Add wrapQueueStorage(storage) adapter that wraps any IQueueStorage into
the split pair, enabling legacy backends to work with the new worker/server
while native implementations are built out. A PendingWrite buffer defers
saveResult/saveError until claim settlement so complete() is only called
once per job (avoiding double-incrementing attempts).

Add MessageQueueClient as a thin wrapper around IMessageQueue for callers
that only need to send messages without the full JobQueueClient machinery.

Refactor JobQueueServer, JobQueueWorker, and JobQueueClient to accept
{ messageQueue, jobStore } instead of a raw IQueueStorage. Legacy
IQueueStorage constructors are preserved via wrapQueueStorage so existing
code continues to work unchanged.

Add IQueueStorage.saveStatus?() as an optional method; wrapQueueStorage
delegates to it when present (avoiding the complete()+attempts-1 hack).

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…ackends

Add native IMessageQueue + IJobStore implementations for every backend,
each paired with a createXxxQueue() factory. Backends no longer require
the wrapQueueStorage adapter for first-class usage.

SQLite (providers/sqlite):
  SqliteMessageQueue — receive() loops up to max claims; leaseMs forwarded
    to next(); subscribeToChanges() via polling.
  SqliteJobStore — delegates to SqliteQueueStorage core; saveStatus() via
    direct UPDATE (no attempts bump).
  createSqliteQueue(db, queueName, opts?) → { messageQueue, jobStore }

IndexedDB (packages/indexeddb):
  IndexedDbMessageQueue — same receive() loop; BroadcastChannel notify.
  IndexedDbJobStore — delegates to IndexedDbQueueStorage; saveStatus()
    via IDB get+put transaction (preserves attempts).
  createIndexedDbQueue(queueName, opts?) → { messageQueue, jobStore }

Postgres (providers/postgres):
  PostgresMessageQueue — receive() loop; LISTEN/NOTIFY via pg Pool.
  PostgresJobStore — delegates to PostgresQueueStorage; saveStatus() via
    parameterized UPDATE.
  createPostgresQueue(pool, queueName, opts?) → { messageQueue, jobStore }

Supabase (providers/supabase):
  SupabaseMessageQueue — receive() loop; polling-based subscription.
  SupabaseJobStore — delegates to SupabaseQueueStorage; saveStatus() via
    Supabase client .update().
  createSupabaseQueue(client, queueName, opts?) → { messageQueue, jobStore }
  Add Number.isFinite guards before raw-string SQL interpolation in
    leaseMs and extendLease ms parameters.
  Add ALTER COLUMN max_attempts SET DEFAULT 10 in the migration path after
    renaming max_retries so existing deployments match fresh installs.

All *MessageQueue and *JobStore types use provider-prefixed PendingWrite
aliases (SqlitePendingWrite, etc.) to avoid name collisions in the
@workglow/workglow meta-package barrel.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…ness fixes

Dead-letter queue (DLQ):
  Add DeadLetter<Input> type. JobQueueServer accepts an optional
  `deadLetter: IMessageQueue<DeadLetter<Input>> | "discard"` option.
  When a job exhausts maxAttempts, the worker forwards the original input,
  error, attempt count, queueName, and jobRunId to the DLQ before marking
  the job FAILED. "discard" suppresses forwarding entirely. Any
  IMessageQueue can serve as a DLQ including a second job queue.

Prefetch batching:
  Add `prefetch?: number` to JobQueueServerOptions (default 1). The server
  passes it through to the worker's receive() call so each poll iteration
  claims up to `prefetch` jobs in one round-trip instead of claiming one
  and immediately polling for the next.

disableJob implementation:
  Replace placeholder with real logic: fail the active claim (if any)
  then call jobStore.saveStatus(id, DISABLED) to persist the status
  without bumping attempts.

Correctness fixes absorbed from code review:
  - ILimiter: validateJobState() failures now call limiter.release(token)
    (not complete()) so RateLimiter window slots are not permanently
    consumed for jobs that never ran. A limiterReleased flag prevents the
    finally block from calling complete() a second time.
  - IndexedDbQueueStorage.abort(PENDING): use put() not complete() so
    aborting a pending job does not bump attempts.
  - wrapQueueStorage.saveStatus: delegate to storage.saveStatus() when
    present; throw for legacy storages that lack it. Removes the broken
    complete()+attempts-1 workaround (backends that compute attempts+1
    from stored state ignored the provided value).
  - IQueueStorage: add optional saveStatus?() so the wrap adapter can
    duck-type check without a cast.
  - SqliteQueueStorage.saveStatus: fix signature to async/Promise<void>.
  - JobStorageConverters: add includeLeaseOwner option alias alongside
    the deprecated includeWorkerId.
  - JobQueueWorker.getClaim: rename from requireClaim throughout.
  - JobQueueWorker.releaseClaim fallback: document that adapters whose
    claim.id differs from job.id (e.g. SQS receipt handles) should treat
    the fallback releaseClaim(job.id) call as best-effort no-op.
  - InMemoryJobQueue.test.ts: move mid-file `import type` to the
    top-level import block.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
@sroussey sroussey force-pushed the claude/standardize-job-queue-1kQBU branch from 9bcc79c to 196c211 Compare May 18, 2026 22:36
claude added 4 commits May 18, 2026 23:26
…model

Split subscribeToChangesBlock into subscribeToChanges.eventDriven (strict
commit order) and subscribeToChanges.polling (set equality + count).
Promote usesPolling to required when supportsSubscriptions: true via a
discriminated TabularStorageContractOpts union. Add a meta-test covering
the failure modes each block is designed to catch.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
Add optional onRunStart(scope) to IDisposeStrategy and ResourceScope.runStart().
Both TaskRunner.handleStart and TaskGraphRunner.handleStart await scope.runStart()
before user code. InactivityStrategy.onRunStart clears all pending timers,
closing the race where a timer armed at runComplete could fire mid-next-run
and dispose a resource the new run was about to use. InactivityStrategy.onRegister
additionally clears any pending timer for the re-registered key.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
… lease-expiry attempt bump

Migrations (H3/C1):
  Restore postgres and sqlite v1 migrations byte-for-byte to the pre-PR
  shape (run_after / run_attempts / max_retries / last_ran_at / worker_id
  and run_after-keyed indexes). v2 adds abort_requested_at / lease_expires_at.
  v3 carries the renames and index swap (drops run_after-keyed indexes,
  recreates them keyed on visible_at) guarded by IF EXISTS (postgres) /
  PRAGMA table_info (sqlite) so fresh installs and upgraded installs land
  on the same schema.
  IndexedDB gets a v2 migration that drops the old queue_status_run_after
  index, cursor-walks every row copying run_after → visible_at, and
  recreates queue_status_visible_at. Synchronous inside the IDB upgrade
  transaction (no awaits between requests).
  Add a parity integration test that walks v1→v2→v3 and asserts the final
  schema matches a fresh install for both postgres and sqlite.

Abort/retry correctness (H1/H4):
  All backends (InMemory, SQLite, Postgres, Supabase, IndexedDB, and the
  wrapQueueStorage adapter) now clear abort_requested_at on:
    (a) complete() PENDING-retry branch
    (b) releaseClaim()
    (c) next() lease-expiry reclaim
  The next() lease-expiry reclaim additionally increments attempts via
  CASE WHEN status = 'PROCESSING' THEN attempts + 1 ELSE attempts END
  so a crashed-worker lease leak terminates via MAX_ATTEMPTS_REACHED
  instead of looping forever. PENDING claims do not bump.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
finalize() — non-incrementing terminal write (C2/M4):
  Add IQueueStorage.finalize(id, fields) as a partial-overwrite that sets
  output/error/error_code/status/completed_at/abort_requested_at without
  touching attempts. Implement across all backends. WrappedClaim.ack and
  WrappedClaim.fail now call finalize instead of complete, removing the
  double-attempts-increment that occurred when a job settled after a
  lease-expiry reclaim had already charged the attempt.
  InMemoryQueueStorage.updateJobStatus renamed to saveStatus to align with
  the IQueueStorage optional interface.

Atomic ack/fail with result/error args (H2):
  IClaim.ack accepts optional result; IClaim.fail accepts
  { error, errorCode, abortRequested, permanent }. completeJob calls
  claim.ack(output ?? null) and failJob calls claim.fail({...}) directly,
  eliminating the two-write window between jobStore.saveResult and
  claim.ack where a crash left a PROCESSING row with output written but
  status not updated. IJobStore.saveResult/saveError marked @deprecated
  and kept as buffered no-ops for one minor release.

Atomic disableJob (H5):
  Add optional IClaim.disable() and extend finalize() to also write
  lease_owner and progress fields. disableJob now performs a single
  storage write setting status=DISABLED, releasing the lease, and
  clearing progress — no error/error_code. Replaces the legacy two-write
  path that briefly persisted FAILED before DISABLED and could emit a
  spurious job_error event to subscribers.

https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
@sroussey sroussey closed this May 18, 2026
@sroussey sroussey reopened this May 18, 2026
@sroussey sroussey merged commit 69c1bd1 into main May 18, 2026
18 checks passed
sroussey added a commit that referenced this pull request May 19, 2026
* fix(job-queue): RateLimiter slot leak, disableJob dispatch, pre-execute abort check

Three follow-up fixes to PR #511 that the commit message claimed but
that did not land in the code:

1. RateLimiter slot leak (CRITICAL): validateJobState() failures
   now call limiter.release() (not complete()) before rethrow, and
   the outer finally is gated on a limiterReleased flag so the slot
   is not double-decremented. Without this, every DEADLINE-EXCEEDED
   or pre-aborted job permanently consumed a RateLimiter window slot.

2. disableJob dispatch (HIGH): JobDisabledError now routes through
   disableJob() instead of falling into failJob(), so attempting to
   disable a job no longer clobbers the DISABLED status with FAILED.
   The H5 atomic-disable code path is now reachable.

3. Pre-execute abort check (HIGH): createAbortController(job.id) is
   moved before validateJobState() so the pre-execute abort flag
   check at the top of validateJobState (which reads
   activeJobAbortControllers) is reachable. Previously the controller
   was created after validation, making the branch dead code.

Adds JobQueueWorker.test.ts with regressions for all three.

* fix(job-queue,indexeddb): abort(PENDING) attempts bump + v2 migration backfill

Two follow-up fixes to PR #511:

1. IndexedDbQueueStorage.abort(PENDING) (CRITICAL): replaced
   `this.complete(job)` with a direct `put()` that sets status=FAILED,
   abort_requested_at, completed_at WITHOUT bumping attempts. Matches
   the cross-backend contract verified in
   InMemoryQueueStorage/PostgresQueueStorage and asserted in a new
   genericJobQueueTests case.

2. IndexedDB v2 migration (CRITICAL): v2 previously copied only
   run_after → visible_at, leaving run_attempts, last_ran_at,
   max_retries, worker_id orphaned on upgrade. The storage layer
   reads the post-rename names, so existing browser-deployed queues
   silently lost retry budgets, last-attempt timestamps, and lease
   ownership on first migration. Extend the cursor body to migrate
   all five renames idempotently. Move the queue_status_visible_at
   createIndex into the terminal cursor branch so the index is built
   off post-migration rows.

Adds IndexedDbQueueMigrations.integration test for the five-rename
case and a cross-backend abort(PENDING) attempts-stability assertion.

* fix(sqlite): v3 max_attempts default = 10 to match Postgres parity

PR #511 lowered the default retry budget from 20 (Postgres) / 23
(SQLite v1) to 10. Postgres v3 explicitly applied
`ALTER COLUMN max_attempts SET DEFAULT 10`; SQLite v3 renamed the
column but did not adjust the default. Fresh SQLite installs ended
up at default 23, Postgres at 10, so callers omitting maxAttempts
got divergent retry behavior across backends.

SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, so the fix uses
the documented 12-step table-rebuild procedure
(https://www.sqlite.org/lang_altertable.html#otheralter): build the
new CREATE TABLE statement from the post-rename PRAGMA table_info,
swap max_attempts's default literal, copy rows, drop old, rename
new, recreate indexes. The rebuild is gated on the current default
not already being '10' so re-running v3 is idempotent.

Extend the migrations parity integration test to compare DEFAULTS,
not just column names, so future drift is caught.

* fix(job-queue): validate leaseMs / extendLease ms inputs across all backends

PR #511 added Number.isFinite guards to Supabase only; Postgres,
SQLite, InMemoryQueueStorage, and IndexedDB passed leaseMs / ms
directly into new Date(Date.now() + ms).toISOString() (yields
"Invalid Date" for NaN/Infinity, poisoning the row) or into
parameterized SQL fragments (runtime error for non-finite). A
negative leaseMs immediately re-expires the lease a worker just
claimed.

Extract validation into a shared validateLeaseMs() helper in
@workglow/job-queue; call it at the top of every next() and
extendLease() across all 5 backends. Migrate Supabase from its
inline Error throw to the shared RangeError so all backends report
the same exception type. ms === 0 remains valid (instant expiry).

Adds a cross-backend contract test that runs against every backend
via the shared genericJobQueueTests harness.

* fix(sqlite): harden v3 rebuild and tighten follow-up tests

Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de

Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>

* test(sqlite): share canonical v3 schema helper

Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de

Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>

* fix(sqlite): tighten schema helper typing

Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de

Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants